[AWS IoT Core] MQTT v5 を使用してユーザープロパティを実装して見ました

[AWS IoT Core] MQTT v5 を使用してユーザープロパティを実装して見ました

Clock Icon2022.12.01

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

CX 事業本部のデリバリー部の平内(SIN)です。

前回は、初めて触れる、MQTT v5 のイメージアップのため、「リクエスト・レスポンス」のパターンを実装してみました。

今回は、ユーザープロパティの実装を試して見たいと思います

※ 現時点(2022/12/01)では、AWS で提供される SDK は、MQTT v5 に対応していないため、サンプル作成には、paho.mqtt を使用させて頂きました。

2022/12/05 追記 Python SDK が開発者プレビューで MQTT v5 対応となりました。
参考:https://dev.classmethod.jp/articles/aws-iot-core-mqtt-v5-sdk-for-python

2 User Property

ユーザープロパティは、MQTT v5で拡張されたプロパティの1つとして定義されたもので、UTF-8 キーと値のペアで自由に定義することができます。

また、ユーザー プロパティは配列となっており、最大メッセージサイズの範囲内で、複数追加することができます。

数値 名称 概要 データ型 コマンド
0x26 User Property 任意のユーザ定義(複数定義可能) UTF-8 string key-value pair CONNECT, CONNACK, PUBLISH, Will, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, DISCONNECT, AUTH
0x17 Request Problem Information 障害時合にユーザープロパティを送信するかどうか Byte CONNECT

※ 0x17 については、今回実装しておりません。

ユーザープロパティを使用すると、独自のメタデータを MQTT メッセージに追加できることになり、MQTT 3.1 における拡張性の欠如を補うものとなります。

なお、文字列であるため、バイナリデータなどは、変換が必要である点に注意が必要です。

3 実装例

以下が、実装したコードです。

プログラムは IoT Core に Connect 及び、Subscribe した後、メッセージを 3 回 Publish しています。 3回のPublish は、それぞれ、テキスト、JSON、バイナリのデータをユーザープロパティをセットしています。 Publish されたメッセージは、自分自身で受け取って、格納されたユーザープロパティの内容を表示しています。

import ssl
import time
import os
import json
import base64
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes

endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
port = 8883

dir = os.path.dirname(os.path.abspath(__file__))
certs = {
    "cafile": "{}/certificates/AmazonRootCA1.pem".format(dir),
    "certfile": "{}/certificates/client-cert.pem".format(dir),
    "keyfile": "{}/certificates/private-key.pem".format(dir),
}

def on_publish(client, userdata, mid):
    print("on_publish")

def on_connect(client, userdata, flags, reasonCode,properties=None):
    print("on_connect flags:{} properties:{} reasonCode: {}".format(flags, properties, reasonCode))

def on_subscribe(mqttc, userdata, mid, granted_qos, properties=None):
    print("on_subscribe")

def on_disconnect(client, userdata, rc,properties):
    print('on_disconnect {} {} {} {}'.format(client, userdata, rc, properties))

def on_unsubscribe(client, userdata, mid, properties, reasonCodes):
    print('on_unsubscribe')

def on_message(client, userdata, message):
    msg=str(message.payload.decode("utf-8"))
    userProperty = message.properties.UserProperty
    for data in userProperty:
        (key,value) = data
        if(key == "base64Data"):
            value = base64.b64decode(value)
        print("key:{} value:{}".format(key,value))

def main():

    topic = "sensor/device01"

    clients = []
    for id in ["client_sub","client_pub"]:
        client = mqtt.Client(id, protocol = mqtt.MQTTv5)
        client.tls_set(certs["cafile"],
                certfile=certs["certfile"],
                keyfile=certs["keyfile"],
                cert_reqs=ssl.CERT_REQUIRED,
                tls_version=ssl.PROTOCOL_TLSv1_2,
                ciphers=None)
        client.on_connect = on_connect
        client.on_message = on_message
        client.on_disconnect = on_disconnect
        client.on_subscribe = on_subscribe
        client.on_publish = on_publish
        client.connect(endpoint, port, properties = None)
        client.loop_start()
        clients.append(client)

    clients[0].subscribe(topic, qos=0)
    time.sleep(3)

    print("send text data.")
    properties = Properties(PacketTypes.PUBLISH)
    properties.UserProperty = [("textData", "hello world.")]
    clients[1].publish(topic,"MESSAGE_002",properties = properties)
    time.sleep(3)

    print("send json data.")
    properties = Properties(PacketTypes.PUBLISH)
    jsonData = {
        "vendor":"classmethod",
        "version":1.0,
        "model":"ABC"
    }
    properties.UserProperty = [("jsonData", json.dumps(jsonData))]
    clients[1].publish(topic,"MESSAGE_001",properties = properties)
    time.sleep(3)

    print("send binary data")
    properties = Properties(PacketTypes.PUBLISH)
    binaryData = b"1234567890"
    properties.UserProperty = [("base64Data", base64.b64encode(binaryData))]
    clients[1].publish(topic,"MESSAGE_002",properties = properties)
    time.sleep(3)

    clients[0].unsubscribe(topic)
    for client in clients:
        client.disconnect()

if __name__ == "__main__":
    main()

4 最後に

ユーザープロパティは、HTTP ヘッダーの概念に非常に似ています。

どういった場面で利用できるかを考えてみると・・・

例えば、バイナリのファイルを転送する場合、ファイル名などのメタ情報をユーザープロパティに配置すれば、payload は、バイナリファイルそのものでよくなり、エンコード・デコードのオーバーヘッドや、転送量の削減が可能になるでしょう。

また、デバイスが多数のベンダーで構成されていて、payload の形式が異なる場合など、ユーザープロパティに、そのベンダー情報を置いておけば、受け取った側でパースする際に、無駄な判定コードが不要となるでしょう。

広範囲に展開されたデバイスの配置場所などを既存の Payload 仕様に変更を加えることなく追加することも有効かも知れません。

ユーザープロパティは、技術的には非常にシンプルな機能ですが、無限の可能性を秘めていると言えるかも知れません。

この後も、MQTT v5で利用可能になった機能について、順次確認を進めたいと思います。

5 参考リンク


MQTT Version 5.0 のリクエスト・レスポンスパターンを試してみた
Introducing new MQTTv5 features for AWS IoT Core to help build flexible architecture patterns
MQTT 5 supported features
[AWS IoT Core] MQTT v5 を使用してリクエスト・レスポンス パターンを実装して見ました
[AWS IoT Core] MQTT v5 を使用してトピック・エイリアスを実装して見ました
[AWS IoT Core] MQTT v5 を使用してメッセージ及び、セッション有効期限とクリーンスタートを実装して見ました

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.